/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.geode.distributed.internal;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
//import java.util.*;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalDataSerializer.SerializerAttributesHolder;
import org.apache.geode.internal.InternalInstantiator;
import org.apache.geode.internal.InternalInstantiator.InstantiatorAttributesHolder;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;

/**
 * A message that is sent to all other distribution manager when
 * a distribution manager starts up.
 */
public class StartupResponseMessage extends HighPriorityDistributionMessage implements AdminMessageType {
  private static final Logger logger = LogService.getLogger();
  
  /** The current cache time of the sending DM */
  protected String rejectionMessage;
  protected int processorId;
  protected boolean responderIsAdmin;
  protected Set interfaces;
  protected int distributedSystemId;
  protected String redundancyZone;
  
  /**
   * To fix B39705, added the instance variables for storing instantiator information. 
   **/
  protected int [] serializerIds = null;
  protected String[] serializerClasseNames = null;
  protected String[] instantiatorClasseNames = null;
  protected String[] instantiatedClasseNames = null;
  protected int [] instantiatorIds = null;
  protected transient StringBuffer fromDataProblems;

  public StartupResponseMessage() {
    
  }
  
  StartupResponseMessage(DistributionManager dm,
                                           int processorId,
                                           InternalDistributedMember recipient,
                                           String rejectionMessage,
                                           boolean responderIsAdmin) {
//    StartupResponseMessage m = new StartupResponseMessage();

    setRecipient(recipient);
    setProcessorId(processorId);
    this.rejectionMessage = rejectionMessage;
    this.responderIsAdmin = responderIsAdmin;

    // Note that if we can't read our network addresses, the peer will reject us.
    this.interfaces = StartupMessage.getMyAddresses(dm);
    this.distributedSystemId = dm.getDistributedSystemId();
    this.redundancyZone = dm.getRedundancyZone(dm.getId());
    
    /**
     * To fix B39705, we have added the instance variables to initialize the
     * information about the instantiators. While preparing the response message,
     * we populate this information. 
     **/
    // Fix for #43677
    Object[] instantiators = InternalInstantiator.getInstantiatorsForSerialization();
    this.instantiatorClasseNames = new String[instantiators.length];
    this.instantiatedClasseNames = new String[instantiators.length];
    this.instantiatorIds = new int[instantiators.length];
    for(int i=0; i< instantiators.length ; i++) {
      if (instantiators[i] instanceof Instantiator) {
        Instantiator inst = (Instantiator)instantiators[i];
        this.instantiatorClasseNames[i] = inst.getClass().getName();
        this.instantiatedClasseNames[i] = inst.getInstantiatedClass().getName();
        this.instantiatorIds[i] = inst.getId();
      } else {
        InstantiatorAttributesHolder inst = (InstantiatorAttributesHolder)instantiators[i];
        this.instantiatorClasseNames[i] = inst.getInstantiatorClassName();
        this.instantiatedClasseNames[i] = inst.getInstantiatedClassName();
        this.instantiatorIds[i] = inst.getId();
      }
    }

    SerializerAttributesHolder[] sahs = InternalDataSerializer.getSerializersForDistribution();
    this.serializerIds = new int[sahs.length];
    this.serializerClasseNames = new String[sahs.length];
    for(int i = 0; i < sahs.length ; i++ ) {
     this.serializerIds[i] = sahs[i].getId();
     this.serializerClasseNames[i] = sahs[i].getClassName();      
    }
  }

  /**
   * set the processor id for this message
   */
  public void setProcessorId(int processorId) {
    this.processorId = processorId;
  }
  
  /** replymessages are always processed in-line */
  @Override
  public boolean getInlineProcess() {
    return true;
  }
  
  @Override
  public boolean sendViaUDP() {
    return true;
  }
  
  /**
   * Adds the distribution managers that have started up to the current
   * DM's list of members.
   *
   * This method is invoked on the receiver side
   */
  @Override
  protected void process(DistributionManager dm) {

    if (this.interfaces == null || this.interfaces.size() == 0) {
//      this.rejectionMessage = "Peer " + getSender() + " has no network interfaces";
    }
    else {
      dm.setEquivalentHosts(this.interfaces);
    }
    dm.setDistributedSystemId(this.distributedSystemId);
    dm.setRedundancyZone(getSender(), this.redundancyZone);
    
    //Process the registration of instantiators & log failures, if any.
    if (this.fromDataProblems != null) {
      if (logger.isDebugEnabled()) {
        logger.debug(this.fromDataProblems);
      }
    }
    
    if(this.serializerIds != null) {
      for(int i = 0; i < serializerIds.length ; i++) {
        String cName = this.serializerClasseNames[i];
        if (cName != null) {
          InternalDataSerializer.register(cName, false, null, null, serializerIds[i]);  
        }        
      }
    }   
    
    if(this.instantiatorIds != null) {
      //Process the Instantiator registrations.
      for(int i=0; i < instantiatorIds.length ; i++) {
       String instantiatorClassName = instantiatorClasseNames[i];
       String instantiatedClassName = instantiatedClasseNames[i];
       int id = instantiatorIds[i];
       if ((instantiatorClassName != null) && (instantiatedClassName != null)) {
        InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id, false); 
       }
      }      
    }
    
    dm.processStartupResponse(this.sender, this.rejectionMessage);

    StartupMessageReplyProcessor proc = (StartupMessageReplyProcessor)
        ReplyProcessor21.getProcessor(processorId);
    if (proc != null) {
      if (this.rejectionMessage != null) {
        // there's no reason to wait for other responses
        proc.setReceivedRejectionMessage(true); 
      }
      else {
        if (!this.responderIsAdmin) {
          proc.setReceivedAcceptance(true);
        }
        proc.process(this);
        if (logger.isTraceEnabled(LogMarker.DM)) {
          logger.trace(LogMarker.DM, "{} Processed {}", proc, this);
        }
      }
    } // proc != null
  }

  public int getDSFID() {
    return STARTUP_RESPONSE_MESSAGE;
  }
  
  private void fromDataProblem(String s) {
    if (this.fromDataProblems == null) {
      this.fromDataProblems = new StringBuffer();
    }

    this.fromDataProblems.append(s);
    this.fromDataProblems.append(System.getProperty("line.separator", "\n"));
  }

  @Override
  public Version[] getSerializationVersions() {
    return null;
  }
  
  @Override
  public void toData(DataOutput out) throws IOException {
    
    super.toData(out);
    
    out.writeInt(processorId);
    DataSerializer.writeString(this.rejectionMessage, out);
    out.writeBoolean(this.responderIsAdmin);

    // Send a description of all of the DataSerializers and
    // Instantiators that have been registered
    out.writeInt(serializerIds.length);
    for (int i = 0; i < serializerIds.length; i++) {
      DataSerializer.writeNonPrimitiveClassName(serializerClasseNames[i], out);
      out.writeInt(serializerIds[i]);
    }

    out.writeInt(this.instantiatorIds.length);
    for (int i = 0; i < instantiatorIds.length; i++) {
      DataSerializer.writeNonPrimitiveClassName(this.instantiatorClasseNames[i], out);
      DataSerializer.writeNonPrimitiveClassName(this.instantiatedClasseNames[i], out);
      out.writeInt(this.instantiatorIds[i]);
    }
    
    DataSerializer.writeObject(interfaces, out);
    out.writeInt(distributedSystemId);
    DataSerializer.writeString(redundancyZone, out);
  }
  
  @Override
  public void fromData(DataInput in)
    throws IOException, ClassNotFoundException {
      
    super.fromData(in);
    
    this.processorId = in.readInt();
    this.rejectionMessage = DataSerializer.readString(in);
    this.responderIsAdmin = in.readBoolean();

    int serializerCount = in.readInt();
    this.serializerClasseNames = new String[serializerCount];
    this.serializerIds = new int[serializerCount];
    for (int i = 0; i < serializerCount; i++) {
      try {
        serializerClasseNames[i] = DataSerializer.readNonPrimitiveClassName(in);
      } 
      finally {
        serializerIds[i] = in.readInt(); // id  
      }
    }
    
    //Fix for B39705 : Deserialize the instantiators in the field variables.
    int instantiatorCount = in.readInt();
    instantiatorClasseNames = new String[instantiatorCount];
    instantiatedClasseNames = new String[instantiatorCount];
    instantiatorIds = new int[instantiatorCount];
    
    for (int i = 0; i < instantiatorCount; i++) {
      instantiatorClasseNames[i] = DataSerializer.readNonPrimitiveClassName(in);
      instantiatedClasseNames[i] = DataSerializer.readNonPrimitiveClassName(in);
      instantiatorIds[i] = in.readInt();
    } // for
    
    interfaces = (Set)DataSerializer.readObject(in);
    distributedSystemId = in.readInt();
    redundancyZone = DataSerializer.readString(in);
  }
 
  @Override
  public String toString() {
    return "StartupResponse: rejectionMessage="
          + this.rejectionMessage + " processor=" + processorId + " responderIsAdmin=" + this.responderIsAdmin + " distributed system id = " +this.distributedSystemId;
  }
}
